---
title: "A2A 任务委派 | A2A 协议 | Kastrax 文档"
description: "A2A 协议的任务委派机制，包括任务创建、执行、取消和状态跟踪。"
---

# A2A 任务委派 ✅

## 任务委派概述 ✅

任务委派是 A2A 协议的核心功能之一，允许代理将任务委派给其他代理执行。通过任务委派，代理可以利用其他代理的专业能力，实现复杂的多代理协作。

Kastrax 实现了全面的 A2A 任务委派机制，包括任务创建、执行、取消和状态跟踪，支持构建灵活的多代理系统。

## 任务模型 ✅

在 Kastrax 的 A2A 实现中，任务由以下组件组成：

### 任务 (Task) ✅

任务是代理之间委派的工作单元，包含以下属性：

```kotlin
/**
 * 任务，代表代理之间委派的工作单元
 */
data class Task(
    /**
     * 任务 ID
     */
    val id: String,
    
    /**
     * 会话 ID
     */
    val sessionId: String? = null,
    
    /**
     * 任务状态
     */
    val status: TaskStatus,
    
    /**
     * 任务历史
     */
    val history: List<Message>,
    
    /**
     * 任务产物
     */
    val artifacts: List<Artifact> = emptyList(),
    
    /**
     * 任务元数据
     */
    val metadata: Map<String, JsonElement> = emptyMap()
)
```

### 任务状态 (TaskStatus) ✅

任务状态表示任务的当前状态，包含以下属性：

```kotlin
/**
 * 任务状态，表示任务的当前状态
 */
data class TaskStatus(
    /**
     * 任务状态
     */
    val state: TaskState,
    
    /**
     * 任务进度
     */
    val progress: Float = 0f,
    
    /**
     * 任务错误
     */
    val error: String? = null,
    
    /**
     * 任务开始时间
     */
    val startedAt: Long? = null,
    
    /**
     * 任务完成时间
     */
    val completedAt: Long? = null
)

/**
 * 任务状态枚举
 */
enum class TaskState {
    /**
     * 已提交
     */
    SUBMITTED,
    
    /**
     * 正在处理
     */
    PROCESSING,
    
    /**
     * 已完成
     */
    COMPLETED,
    
    /**
     * 已失败
     */
    FAILED,
    
    /**
     * 已取消
     */
    CANCELLED
}
```

### 消息 (Message) ✅

消息是任务中的通信单元，包含以下属性：

```kotlin
/**
 * 消息，表示任务中的通信单元
 */
data class Message(
    /**
     * 消息 ID
     */
    val id: String = UUID.randomUUID().toString(),
    
    /**
     * 消息角色
     */
    val role: MessageRole = MessageRole.USER,
    
    /**
     * 消息部分
     */
    val parts: List<MessagePart>,
    
    /**
     * 消息元数据
     */
    val metadata: Map<String, JsonElement> = emptyMap()
)

/**
 * 消息角色枚举
 */
enum class MessageRole {
    /**
     * 用户
     */
    USER,
    
    /**
     * 助手
     */
    ASSISTANT,
    
    /**
     * 系统
     */
    SYSTEM
}
```

### 消息部分 (MessagePart) ✅

消息部分是消息的组成部分，可以是文本、图像等：

```kotlin
/**
 * 消息部分，表示消息的组成部分
 */
sealed class MessagePart

/**
 * 文本部分
 */
data class TextPart(
    /**
     * 文本内容
     */
    val text: String
) : MessagePart()

/**
 * 图像部分
 */
data class ImagePart(
    /**
     * 图像 URL
     */
    val url: String,
    
    /**
     * 图像 MIME 类型
     */
    val mimeType: String = "image/jpeg"
) : MessagePart()
```

### 任务产物 (Artifact) ✅

任务产物是任务执行的结果，可以是文本、图像等：

```kotlin
/**
 * 任务产物，表示任务执行的结果
 */
data class Artifact(
    /**
     * 产物名称
     */
    val name: String,
    
    /**
     * 产物部分
     */
    val parts: List<MessagePart>,
    
    /**
     * 产物元数据
     */
    val metadata: Map<String, JsonElement> = emptyMap()
)
```

## 任务管理器 ✅

Kastrax 提供了任务管理器，用于管理任务的创建、执行、取消和状态跟踪：

```kotlin
/**
 * 任务管理器，用于管理任务的创建、执行、取消和状态跟踪
 */
class TaskManager(
    private val scope: CoroutineScope = CoroutineScope(Dispatchers.Default)
) {
    /**
     * 任务映射
     */
    private val tasks = ConcurrentHashMap<String, Task>()
    
    /**
     * 任务状态映射
     */
    private val taskStatuses = ConcurrentHashMap<String, MutableStateFlow<TaskStatus>>()
    
    /**
     * 任务事件映射
     */
    private val taskEvents = ConcurrentHashMap<String, MutableSharedFlow<TaskEvent>>()
    
    /**
     * 任务推送通知配置映射
     */
    private val taskPushNotifications = ConcurrentHashMap<String, PushNotificationConfig>()
    
    /**
     * 创建任务
     */
    fun createTask(message: Message, sessionId: String? = null): Task {
        val taskId = UUID.randomUUID().toString()
        val task = Task(
            id = taskId,
            sessionId = sessionId,
            status = TaskStatus(state = TaskState.SUBMITTED),
            history = listOf(message)
        )
        
        tasks[taskId] = task
        taskStatuses[taskId] = MutableStateFlow(task.status)
        taskEvents[taskId] = MutableSharedFlow(replay = 0, extraBufferCapacity = 100)
        
        return task
    }
    
    /**
     * 获取任务
     */
    fun getTask(taskId: String): Task? {
        return tasks[taskId]
    }
    
    /**
     * 获取任务状态
     */
    fun getTaskStatus(taskId: String): TaskStatus? {
        return taskStatuses[taskId]?.value
    }
    
    /**
     * 获取任务状态流
     */
    fun getTaskStatusFlow(taskId: String): Flow<TaskStatus>? {
        return taskStatuses[taskId]
    }
    
    /**
     * 获取任务事件流
     */
    fun getTaskEventFlow(taskId: String): Flow<TaskEvent>? {
        return taskEvents[taskId]
    }
    
    /**
     * 更新任务状态
     */
    fun updateTaskStatus(taskId: String, status: TaskStatus) {
        val task = tasks[taskId] ?: return
        val updatedTask = task.copy(status = status)
        tasks[taskId] = updatedTask
        taskStatuses[taskId]?.value = status
        
        // 发送任务事件
        scope.launch {
            val event = when (status.state) {
                TaskState.PROCESSING -> TaskEvent.TaskStarted(taskId)
                TaskState.COMPLETED -> TaskEvent.TaskCompleted(taskId)
                TaskState.FAILED -> TaskEvent.TaskFailed(taskId, status.error)
                TaskState.CANCELLED -> TaskEvent.TaskCancelled(taskId)
                else -> null
            }
            
            if (event != null) {
                taskEvents[taskId]?.emit(event)
                
                // 发送推送通知
                taskPushNotifications[taskId]?.let { config ->
                    if (config.events.contains(event.type) || config.events.contains("*")) {
                        sendPushNotification(config, event)
                    }
                }
            }
        }
    }
    
    /**
     * 添加任务消息
     */
    fun addTaskMessage(taskId: String, message: Message) {
        val task = tasks[taskId] ?: return
        val updatedTask = task.copy(
            history = task.history + message
        )
        tasks[taskId] = updatedTask
        
        // 发送任务事件
        scope.launch {
            val event = TaskEvent.MessageAdded(taskId, message)
            taskEvents[taskId]?.emit(event)
            
            // 发送推送通知
            taskPushNotifications[taskId]?.let { config ->
                if (config.events.contains("message.added") || config.events.contains("*")) {
                    sendPushNotification(config, event)
                }
            }
        }
    }
    
    /**
     * 添加任务产物
     */
    fun addTaskArtifact(taskId: String, artifact: Artifact) {
        val task = tasks[taskId] ?: return
        val updatedTask = task.copy(
            artifacts = task.artifacts + artifact
        )
        tasks[taskId] = updatedTask
        
        // 发送任务事件
        scope.launch {
            val event = TaskEvent.ArtifactAdded(taskId, artifact)
            taskEvents[taskId]?.emit(event)
            
            // 发送推送通知
            taskPushNotifications[taskId]?.let { config ->
                if (config.events.contains("artifact.added") || config.events.contains("*")) {
                    sendPushNotification(config, event)
                }
            }
        }
    }
    
    /**
     * 取消任务
     */
    fun cancelTask(taskId: String) {
        val task = tasks[taskId] ?: return
        if (task.status.state == TaskState.COMPLETED || task.status.state == TaskState.FAILED) {
            return
        }
        
        updateTaskStatus(
            taskId = taskId,
            status = TaskStatus(
                state = TaskState.CANCELLED,
                completedAt = System.currentTimeMillis()
            )
        )
    }
    
    /**
     * 设置任务推送通知
     */
    fun setTaskPushNotification(taskId: String, config: PushNotificationConfig) {
        taskPushNotifications[taskId] = config
    }
    
    /**
     * 处理任务
     */
    suspend fun processTask(agent: A2AAgent, task: Task): Flow<TaskEvent> = flow {
        // 更新任务状态为处理中
        updateTaskStatus(
            taskId = task.id,
            status = TaskStatus(
                state = TaskState.PROCESSING,
                startedAt = System.currentTimeMillis()
            )
        )
        
        emit(TaskEvent.TaskStarted(task.id))
        
        try {
            // 获取最后一条消息
            val message = task.history.lastOrNull()
            
            if (message != null) {
                // 创建调用请求
                val request = InvokeRequest(
                    id = UUID.randomUUID().toString(),
                    capabilityId = "process_task", // 假设代理有一个处理任务的能力
                    parameters = mapOf(
                        "prompt" to JsonPrimitive(getPromptFromMessage(message)),
                        "task_id" to JsonPrimitive(task.id)
                    )
                )
                
                // 调用代理处理任务
                val response = agent.invoke(request)
                
                // 创建任务产物
                val artifact = Artifact(
                    name = "result",
                    parts = listOf(
                        TextPart(
                            text = response.result.toString()
                        )
                    )
                )
                
                // 添加任务产物
                addTaskArtifact(task.id, artifact)
                
                emit(TaskEvent.ArtifactAdded(task.id, artifact))
                
                // 创建助手消息
                val assistantMessage = Message(
                    role = MessageRole.ASSISTANT,
                    parts = artifact.parts
                )
                
                // 添加助手消息
                addTaskMessage(task.id, assistantMessage)
                
                emit(TaskEvent.MessageAdded(task.id, assistantMessage))
                
                // 更新任务状态为已完成
                updateTaskStatus(
                    taskId = task.id,
                    status = TaskStatus(
                        state = TaskState.COMPLETED,
                        progress = 1f,
                        completedAt = System.currentTimeMillis()
                    )
                )
                
                emit(TaskEvent.TaskCompleted(task.id))
            } else {
                // 如果没有消息，则更新任务状态为失败
                updateTaskStatus(
                    taskId = task.id,
                    status = TaskStatus(
                        state = TaskState.FAILED,
                        error = "No message found in task",
                        completedAt = System.currentTimeMillis()
                    )
                )
                
                emit(TaskEvent.TaskFailed(task.id, "No message found in task"))
            }
        } catch (e: Exception) {
            // 更新任务状态为失败
            updateTaskStatus(
                taskId = task.id,
                status = TaskStatus(
                    state = TaskState.FAILED,
                    error = e.message,
                    completedAt = System.currentTimeMillis()
                )
            )
            
            emit(TaskEvent.TaskFailed(task.id, e.message))
        }
    }
    
    /**
     * 从消息中获取提示
     */
    private fun getPromptFromMessage(message: Message): String {
        return message.parts
            .filterIsInstance<TextPart>()
            .joinToString("\n") { it.text }
    }
    
    /**
     * 发送推送通知
     */
    private fun sendPushNotification(config: PushNotificationConfig, event: Any) {
        // 在实际实现中，这里应该使用 HTTP 客户端发送通知
        // 为了简化，这里只是打印日志
        println("Sending push notification to ${config.url}: $event")
    }
    
    /**
     * 处理发送任务请求
     */
    suspend fun onSendTask(agent: A2AAgent, params: TaskSendParams): Task {
        // 创建任务
        val task = createTask(params.message, params.sessionId)
        
        // 如果配置了推送通知，则设置推送通知
        params.pushNotification?.let { config ->
            setTaskPushNotification(task.id, config)
        }
        
        // 启动协程处理任务
        scope.launch {
            processTask(agent, task)
        }
        
        return task
    }
    
    /**
     * 处理发送任务并订阅更新请求
     */
    suspend fun onSendTaskSubscribe(agent: A2AAgent, params: TaskSendParams): Flow<Any> = flow {
        // 创建任务
        val task = createTask(params.message, params.sessionId)
        
        // 如果配置了推送通知，则设置推送通知
        params.pushNotification?.let { config ->
            setTaskPushNotification(task.id, config)
        }
        
        // 处理任务并发送更新
        processTask(agent, task).collect { event ->
            emit(event)
        }
    }
}
```

## 任务委派流程 ✅

Kastrax 的 A2A 实现支持多种任务委派流程：

### 1. 同步任务委派 ✅

同步任务委派是最简单的委派方式，代理直接调用另一个代理的能力，并等待结果：

```kotlin
// 创建调用请求
val request = InvokeRequest(
    id = UUID.randomUUID().toString(),
    capabilityId = "data_analysis",
    parameters = mapOf(
        "dataset_url" to JsonPrimitive("https://example.com/data.csv"),
        "analysis_type" to JsonPrimitive("statistical")
    )
)

// 调用代理能力
val response = agent.invoke(request)

// 处理响应
println("分析结果：${response.result}")
```

### 2. 异步任务委派 ✅

异步任务委派允许代理创建任务，然后稍后检查结果：

```kotlin
// 创建任务
val message = Message(
    parts = listOf(
        TextPart(
            text = "分析数据集 https://example.com/data.csv 并生成统计报告"
        )
    )
)

val task = taskManager.createTask(message)

// 启动协程处理任务
scope.launch {
    taskManager.processTask(agent, task)
}

// 稍后检查任务状态
val taskStatus = taskManager.getTaskStatus(task.id)
if (taskStatus?.state == TaskState.COMPLETED) {
    // 获取任务产物
    val artifacts = taskManager.getTask(task.id)?.artifacts
    println("分析结果：${artifacts?.firstOrNull()?.parts?.filterIsInstance<TextPart>()?.firstOrNull()?.text}")
}
```

### 3. 推送通知 ✅

推送通知允许代理在任务状态变化时接收通知：

```kotlin
// 创建任务
val message = Message(
    parts = listOf(
        TextPart(
            text = "分析数据集 https://example.com/data.csv 并生成统计报告"
        )
    )
)

// 设置推送通知
val pushNotification = PushNotificationConfig(
    url = "https://example.com/webhook",
    events = listOf("task.completed", "task.failed")
)

// 发送任务
val task = taskManager.onSendTask(
    agent = agent,
    params = TaskSendParams(
        message = message,
        pushNotification = pushNotification
    )
)
```

### 4. 流式响应 ✅

流式响应允许代理实时接收任务更新：

```kotlin
// 创建任务
val message = Message(
    parts = listOf(
        TextPart(
            text = "分析数据集 https://example.com/data.csv 并生成统计报告"
        )
    )
)

// 发送任务并订阅更新
taskManager.onSendTaskSubscribe(
    agent = agent,
    params = TaskSendParams(
        message = message
    )
).collect { event ->
    when (event) {
        is TaskEvent.TaskStarted -> println("任务已开始：${event.taskId}")
        is TaskEvent.MessageAdded -> println("消息已添加：${event.message.parts.filterIsInstance<TextPart>().firstOrNull()?.text}")
        is TaskEvent.ArtifactAdded -> println("产物已添加：${event.artifact.parts.filterIsInstance<TextPart>().firstOrNull()?.text}")
        is TaskEvent.TaskCompleted -> println("任务已完成：${event.taskId}")
        is TaskEvent.TaskFailed -> println("任务已失败：${event.error}")
    }
}
```

## 任务委派 API ✅

Kastrax 的 A2A 实现提供了标准的任务委派 API，包括以下端点：

### 1. 发送任务端点 ✅

发送任务端点允许代理创建任务：

```
POST /api/tasks
```

请求示例：

```json
{
  "message": {
    "role": "USER",
    "parts": [
      {
        "text": "分析数据集 https://example.com/data.csv 并生成统计报告"
      }
    ]
  },
  "session_id": "user-session-1",
  "push_notification": {
    "url": "https://example.com/webhook",
    "events": ["task.completed", "task.failed"]
  }
}
```

响应示例：

```json
{
  "task": {
    "id": "task-123",
    "session_id": "user-session-1",
    "status": {
      "state": "SUBMITTED"
    },
    "history": [
      {
        "id": "msg-1",
        "role": "USER",
        "parts": [
          {
            "text": "分析数据集 https://example.com/data.csv 并生成统计报告"
          }
        ]
      }
    ]
  }
}
```

### 2. 获取任务端点 ✅

获取任务端点允许代理获取任务信息：

```
GET /api/tasks/{taskId}
```

响应示例：

```json
{
  "task": {
    "id": "task-123",
    "session_id": "user-session-1",
    "status": {
      "state": "COMPLETED",
      "progress": 1.0,
      "started_at": 1620000000000,
      "completed_at": 1620000010000
    },
    "history": [
      {
        "id": "msg-1",
        "role": "USER",
        "parts": [
          {
            "text": "分析数据集 https://example.com/data.csv 并生成统计报告"
          }
        ]
      },
      {
        "id": "msg-2",
        "role": "ASSISTANT",
        "parts": [
          {
            "text": "数据集分析报告：\n\n平均值：42.5\n中位数：40.0\n标准差：10.2"
          }
        ]
      }
    ],
    "artifacts": [
      {
        "name": "result",
        "parts": [
          {
            "text": "数据集分析报告：\n\n平均值：42.5\n中位数：40.0\n标准差：10.2"
          }
        ]
      }
    ]
  }
}
```

### 3. 获取任务状态端点 ✅

获取任务状态端点允许代理获取任务状态：

```
GET /api/tasks/{taskId}/status
```

响应示例：

```json
{
  "status": {
    "state": "COMPLETED",
    "progress": 1.0,
    "started_at": 1620000000000,
    "completed_at": 1620000010000
  }
}
```

### 4. 取消任务端点 ✅

取消任务端点允许代理取消任务：

```
POST /api/tasks/{taskId}/cancel
```

响应示例：

```json
{
  "success": true,
  "task_id": "task-123",
  "status": {
    "state": "CANCELLED",
    "completed_at": 1620000020000
  }
}
```

### 5. 订阅任务更新端点 ✅

订阅任务更新端点允许代理实时接收任务更新：

```
GET /api/tasks/{taskId}/subscribe
```

响应示例（Server-Sent Events）：

```
event: task.started
data: {"task_id":"task-123"}

event: message.added
data: {"task_id":"task-123","message":{"id":"msg-2","role":"ASSISTANT","parts":[{"text":"数据集分析报告：\n\n平均值：42.5\n中位数：40.0\n标准差：10.2"}]}}

event: artifact.added
data: {"task_id":"task-123","artifact":{"name":"result","parts":[{"text":"数据集分析报告：\n\n平均值：42.5\n中位数：40.0\n标准差：10.2"}]}}

event: task.completed
data: {"task_id":"task-123"}
```

## 任务委派示例 ✅

以下是一个完整的任务委派示例：

```kotlin
// 创建 A2A 实例
val a2aInstance = a2a {
    // 配置任务管理器
    taskManager {
        // 配置任务清理
        taskCleanup = true
        taskTtl = 3600000 // 1 小时
    }
    
    // 注册 A2A 代理
    a2aAgent {
        id = "data-analysis-agent"
        name = "数据分析代理"
        description = "提供数据分析和可视化能力的代理"
        
        // 添加能力
        capability {
            id = "data_analysis"
            name = "数据分析"
            description = "分析提供的数据集并返回统计结果"
            
            // 添加参数
            parameter {
                name = "dataset_url"
                type = "string"
                description = "数据集URL"
                required = true
            }
            
            parameter {
                name = "analysis_type"
                type = "string"
                description = "分析类型"
                required = true
            }
            
            returnType = "json"
        }
        
        // 配置认证
        authentication {
            type = AuthType.API_KEY
        }
    }
    
    // 配置服务器
    server {
        port = 8080
        enableCors = true
    }
}

// 启动 A2A 实例
a2aInstance.start()

// 获取任务管理器
val taskManager = a2aInstance.getTaskManager()

// 获取代理
val agent = a2aInstance.getAgent("data-analysis-agent")

// 创建任务
val message = Message(
    parts = listOf(
        TextPart(
            text = "分析数据集 https://example.com/data.csv 并生成统计报告"
        )
    )
)

// 设置推送通知
val pushNotification = PushNotificationConfig(
    url = "https://example.com/webhook",
    events = listOf("task.completed", "task.failed")
)

// 发送任务
val task = taskManager.onSendTask(
    agent = agent,
    params = TaskSendParams(
        message = message,
        sessionId = "user-session-1",
        pushNotification = pushNotification
    )
)

println("任务已创建：${task.id}")

// 等待任务完成
while (true) {
    val taskStatus = taskManager.getTaskStatus(task.id)
    if (taskStatus?.state == TaskState.COMPLETED || taskStatus?.state == TaskState.FAILED) {
        break
    }
    Thread.sleep(1000)
}

// 获取任务结果
val updatedTask = taskManager.getTask(task.id)
if (updatedTask?.status?.state == TaskState.COMPLETED) {
    val artifacts = updatedTask.artifacts
    println("分析结果：${artifacts.firstOrNull()?.parts?.filterIsInstance<TextPart>()?.firstOrNull()?.text}")
} else {
    println("任务失败：${updatedTask?.status?.error}")
}

// 停止 A2A 实例
a2aInstance.stop()
```

## 总结 ✅

Kastrax 的 A2A 实现提供了全面的任务委派机制，包括任务创建、执行、取消和状态跟踪，支持构建灵活的多代理系统。通过任务委派，代理可以利用其他代理的专业能力，实现复杂的多代理协作。

## 下一步 ✅

了解了 A2A 任务委派后，您可以：

1. 学习 [A2A 代理发现](/docs/a2a/a2a-discovery.mdx)
2. 研究 [A2A 安全机制](/docs/a2a/a2a-security.mdx)
3. 探索 [多代理协作](/docs/a2a/a2a-collaboration.mdx)
4. 学习 [工作流引擎](/docs/a2a/a2a-workflow.mdx)
5. 研究 [A2A 协议规范](/docs/a2a/a2a-specification.mdx)
